概述
本节为 PrismaModule 的 forRoot 方法添加数据库连接的断线重试和错误处理逻辑。通过 RxJS 的 defer、retry、catchError 和 lastValueFrom 组合实现自动重连,参考 @nestjs/mongoose 官方模块的实现方式。
扩展 Options 接口
在 PrismaModuleOptions 中添加重试和工厂函数相关属性:
// prisma/prisma-options.interface.ts
import { Prisma } from '@prisma/client';
export interface PrismaModuleOptions {
url?: string;
name?: string;
options?: Prisma.PrismaClientOptions;
// 重试配置
retryAttempts?: number; // 最大重试次数,-1 为无限重试
retryDelay?: number; // 重试间隔(毫秒),默认 3000
// 工厂函数
connectionFactory?: (connection: any, name: string) => any;
connectionErrorFactory?: (
error: Prisma.PrismaClientKnownRequestError,
) => Prisma.PrismaClientKnownRequestError;
}
typescript
工厂函数说明
| 函数 | 参数 | 返回值 | 作用 |
|---|---|---|---|
connectionFactory | client 实例、连接名称 | 修改后的 client | 允许用户自定义连接初始化逻辑 |
connectionErrorFactory | 错误对象 | 处理后的错误 | 允许用户自定义错误处理 |
安装 RxJS
虽然 NestJS 已内置 RxJS,但建议显式安装以锁定版本:
pnpm add rxjs
bash
Core Module 重试逻辑实现
核心流程
forRoot(options)
|
v
解构 retryAttempts, retryDelay, connectionFactory, connectionErrorFactory
|
v
定义 prismaConnectionFactory(根据 DB 类型选择 Client)
|
v
useFactory:
1. 创建 client 实例
2. defer(() => client.connect()) -- 延迟连接
3. .pipe(handleRetry(...)) -- 加入重试逻辑
4. .pipe(catchError(...)) -- 捕获最终错误
5. lastValueFrom(...) -- 转为 Promise
6. 返回连接成功的 client
text
完整实现
// prisma/prisma-core.module.ts
import { defer, lastValueFrom } from 'rxjs';
import { Logger } from '@nestjs/common';
export class PrismaCoreModule {
private static readonly logger = new Logger('PrismaModule');
static forRoot(options: PrismaModuleOptions): DynamicModule {
const {
retryAttempts = 10,
retryDelay = 3000,
connectionFactory,
connectionErrorFactory,
...restOptions
} = options;
// 默认错误处理工厂
const prismaConnectionErrorFactory =
connectionErrorFactory || ((error: any) => error);
// 连接工厂:根据 DB 类型选择 Client
const prismaConnectionFactory =
connectionFactory ||
async (clientOptions: any) => {
const dbType = getDBType(restOptions.url);
let prismaClient;
if (dbType === 'mysql') {
prismaClient = new MySQLClient(clientOptions);
} else {
prismaClient = new PGClient(clientOptions);
}
return prismaClient;
};
const providerName = restOptions.name || 'PRISMA_CLIENT';
const prismaClientProvider: Provider = {
provide: providerName,
useFactory: async () => {
const clientOptions = buildClientOptions(restOptions);
const client = await prismaConnectionFactory(clientOptions);
// 连接 + 重试 + 错误处理
return lastValueFrom(
defer(async () => {
await client.connect();
return client;
}).pipe(
handleRetry(retryAttempts, retryDelay),
catchError((error) =>
throwError(() => prismaConnectionErrorFactory(error)),
),
),
);
},
};
return {
module: PrismaCoreModule,
providers: [prismaClientProvider],
exports: [prismaClientProvider],
};
}
}
typescript
handleRetry 实现
// prisma/prisma.utils.ts
import { retry, timer, throwError, from, Observable } from 'rxjs';
import { Logger } from '@nestjs/common';
export function handleRetry(
retryAttempts: number,
retryDelay: number,
): (source: Observable<any>) => Observable<any> {
const logger = new Logger('PrismaModule');
return (source: Observable<any>) =>
source.pipe(
retry({
count: retryAttempts < 0 ? Infinity : retryAttempts,
delay: (error, retryCount) => {
const attempts = retryAttempts < 0 ? Infinity : retryAttempts;
if (retryCount <= attempts) {
logger.error(
`Unable to connect to the database. Retrying (${retryCount})...`,
);
return timer(retryDelay);
}
return throwError(() => error);
},
}),
);
}
typescript
关键细节
- retryAttempts < 0 -- 传入
Infinity实现无限重试 - delay 回调 -- 每次重试前先延迟
retryDelay毫秒 - 日志记录 -- 每次重试都记录日志,方便排查问题
无限重试的处理
retry 操作符的 count 参数接受 Infinity:
// 当 retryAttempts 为 -1 时
retry({
count: Infinity, // 无限重试
delay: (error, retryCount) => {
// 始终返回 timer,不会触发最终失败
return timer(retryDelay);
},
})
typescript
测试验证
测试无限重试
// app.module.ts
PrismaModule.forRoot({
url: 'postgresql://pg_user:example@localhost:5433/test_db',
name: 'prisma1',
retryAttempts: -1, // 无限重试
retryDelay: 3000, // 3 秒间隔
}),
typescript
控制台输出(数据库未启动时):
[PrismaModule] Unable to connect to the database. Retrying (1)...
[PrismaModule] Unable to connect to the database. Retrying (2)...
[PrismaModule] Unable to connect to the database. Retrying (3)...
...(持续重试直到数据库启动)
text
测试有限重试
PrismaModule.forRoot({
url: 'postgresql://pg_user:example@localhost:5433/test_db',
name: 'prisma1',
retryAttempts: 2, // 最多重试 2 次
retryDelay: 3000,
}),
typescript
控制台输出:
[PrismaModule] Unable to connect to the database. Retrying (1)...
[PrismaModule] Unable to connect to the database. Retrying (2)...
[Error] Can't reach database server at `localhost:5433`
text
重试耗尽后抛出最终错误。
测试正常连接
当数据库正常启动后,连接成功,请求正常响应:
GET /v1
-> 返回 PostgreSQL 数据
http
async/await 注意事项
在 useFactory 中使用 defer 时需要注意异步处理:
// 错误写法:缺少 await,client 为 Promise
useFactory: () => {
const client = prismaConnectionFactory(options);
return lastValueFrom(defer(() => client.connect()));
};
// 正确写法:加上 async/await
useFactory: async () => {
const client = await prismaConnectionFactory(options);
return lastValueFrom(
defer(async () => {
await client.connect();
return client;
}),
);
};
typescript
如果 connectionFactory 返回的是 Promise 而不使用 await,后续的 client.connect() 会报 cannot be used without new 错误。
后续方向
forRoot 的同步逻辑已完成,下一步是实现 forRootAsync 的异步版本。有了同步逻辑的基础,异步版本的主要区别在于:
useFactory改为接收外部注入的依赖- 通过
inject声明依赖项 - 可结合
ConfigService动态读取配置
关键要点
- RxJS 三件套 --
defer+retry+lastValueFrom是实现断线重连的核心组合 - Infinity 重试 --
retryAttempts传入Infinity实现无限重试,-1 是用户侧的约定 - 工厂函数模式 --
connectionFactory和connectionErrorFactory提供类似生命周期钩子的扩展点 - 参考官方实现 -- 整套逻辑参考
@nestjs/mongoose的mongoose-core.module.ts - async/await 一致性 --
useFactory中的异步操作必须使用async/await,避免 Promise 未等待的问题
↑